package com.kool.kmqtt.server;

import com.alibaba.fastjson.JSON;
import com.kool.kmqtt.server.constant.PacketTypeEnum;
import com.kool.kmqtt.server.failover.FailoverUtil;
import com.kool.kmqtt.server.log.ReceiveLog;
import com.kool.kmqtt.service.TopicConstant;
import com.kool.kmqtt.server.packet.Packet;
import com.kool.kmqtt.server.packet.PublishVariableHeader;
import com.kool.kmqtt.server.session.SessionContext;
import com.kool.kmqtt.server.session.SessionHolder;
import com.kool.kmqtt.server.task.ConnectCheckTask;
import com.kool.kmqtt.server.task.ConnectDelayed;
import com.kool.kmqtt.service.KafkaProvider;
import com.kool.kmqtt.util.DateUtil;
import com.kool.kmqtt.util.SpringUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

import java.lang.ref.WeakReference;
import java.util.Date;

/**
 * 连接处理类
 */
@Slf4j
public class MqttServerChannelHandler extends ChannelInboundHandlerAdapter {
    private final PacketReceiver packetReceiver = new PacketReceiver();
    private ConnectCheckTask connectCheckTask;

    public MqttServerChannelHandler(ConnectCheckTask connectCheckTask) {
        this.connectCheckTask = connectCheckTask;
    }

    @Override
    public void channelActive(final ChannelHandlerContext ctx) throws Exception { // (1)
        String remoteAddress = ctx.channel().remoteAddress().toString();
        //熔断检查
        if (FailoverUtil.check(remoteAddress)) {
            //超过阈值，断开连接
            log.info("客户端【{}】{}秒内连接异常次数超过{}次，断开连接",
                    remoteAddress,
                    ServerConfig.getInstance().getFailoverCounterResetSeconds(),
                    ServerConfig.getInstance().getFailoverThreshold());
            ctx.close();
            return;
        }

        //创建会话上下文
        SessionContext sessionContext = new SessionContext();
        sessionContext.setRemoteAddress(remoteAddress);
        sessionContext.setSessionId(ctx.channel().id().asLongText());
        sessionContext.setCtx(new WeakReference<>(ctx));
        //缓存会话上下文
        SessionHolder.getInstance().put(sessionContext.getSessionId(), sessionContext);

        long timeout = ServerConfig.getInstance().getConnectTimeout();
        ConnectDelayed connectDelayed = new ConnectDelayed(ctx.channel().id().asLongText(), System.currentTimeMillis() + timeout);

        connectCheckTask.check(connectDelayed);
        log.debug("网络已连接，会话id={}", sessionContext.getSessionId());
        super.channelActive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        long t1 = System.currentTimeMillis();
        String sessionId = ctx.channel().id().asLongText();
        SessionContext sessionContext = SessionHolder.getInstance().getBySessionId(sessionId);
        //处理日志
        ReceiveLog receiveLog = new ReceiveLog();
        receiveLog.setTimestamp(DateUtil.dateString(new Date()))
                .setClientId(sessionContext.getClientId())
                .setRemoteAddress(sessionContext.getRemoteAddress())
                .setUserName(sessionContext.getUserName());
        try {
            ByteBuf in = (ByteBuf) msg;
            //接收报文
            Packet packet = packetReceiver.receive(ctx, in);
            int packetType = packet.getFixedHeader().getPacketType();
            receiveLog.setPacketType(Integer.toString(packetType));
            if (packetType == PacketTypeEnum.PUBLISH.getCode()) {
                //取PUBLISH报文的主题
                PublishVariableHeader publishVariableHeader = (PublishVariableHeader) packet.getVariableHeader();
                receiveLog.setTopicName(publishVariableHeader.getTopicName());
            }
            receiveLog.setIsSuccess("1");
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            //断开连接
            SessionHolder.close(sessionContext);
            //熔断计数
            FailoverUtil.count(sessionContext.getRemoteAddress());
            receiveLog.setIsSuccess("0");
        } finally {
            long t2 = System.currentTimeMillis();
            receiveLog.setCostTime(t2 - t1);
            if (ServerConfig.getInstance().getLogAnalysisSwitch()) {
                //推送日志到kafka,日志统计分析
                try {
                    KafkaProvider kafkaProvider = SpringUtil.getBean(KafkaProvider.class);
                    //多租户模式重新定义topic:租户ID_receive_log
                    kafkaProvider.sendToKafka(TopicConstant.TOPIC_SUFFIX_RECEIVE_LOG, JSON.toJSONString(receiveLog));
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                }
            }
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("网络已断开，会话id={}", ctx.channel().id().asLongText());
        super.channelInactive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        log.error("发生异常");
        // Close the connection when an exception is raised.
        log.error(cause.getMessage(), cause);
        ctx.close();
    }

}
